{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# ⭐ 1- Intro to Data Cleaning and Preparation ⭐\n",
    "In this chapter, you will: \n",
    "\n",
    "•\tExercise 1: Load data into a Spark DataFrame (DF) \n",
    "\n",
    "•\tExercise 2: Query the DF using SQL to get a feel for the data \n",
    "\n",
    "•\tExercise 3: Filter and transform the Data \n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import SparkSession \n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark = SparkSession.builder \\\n",
    "    .master('local[*]') \\\n",
    "    .appName(\"Intro\") \\\n",
    "    .getOrCreate()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Exercise 1: Load the data:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = spark.read.csv ('training_bot_data.csv', header= True) "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### What is the size of the data? use count() function "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# understand what the data size is:"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Immutability\n",
    "\n",
    "DataFrame in Spark is **immutable**.\n",
    "\n",
    "What does that mean?\n",
    "It means that every action we do on DataFrame doesn't change the actual DataFrame!\n",
    "\n",
    "Instead, it creates a new DataFrame.\n",
    "Run the next commands and get a feel for working with DataFrame.\n",
    "\n",
    "Don't worry if you don't understand everything completely, the next exercises go deeper into it."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df.limit(2) .toPandas ()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_new = df.select('bot')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_new.limit(2) .toPandas ()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You probably notice that `df_new`, and `df` are different!\n",
    "They are pointers to two different DataFrames.\n",
    "\n",
    "Try the next command:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df.select('bot').limit(2) .toPandas ()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df.limit(2) .toPandas ()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The last `toPandas ()` commands printed different results, \n",
    "\n",
    "### why?\n",
    "\n",
    "`df.select('bot')` functionality returns pointer to a new immutable DataFrame! AHA!\n",
    "\n",
    "Let's have `df_new` and `df` point to the same DataFrame.\n",
    "By doing this, we release the pointer from `df_new` and it can be erased from memory.\n",
    "\n",
    "If we wish to have access to it again, we will need to rerun the logic.\n",
    "Bare that in mind for working with `Apache Spark`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_new = df\n",
    "df_new.limit(2) .toPandas()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "By the Way! `limit(2)` returns a pointer to a DataFrame with 2 rows.\n",
    "\n",
    "Interesting! This is what **Immutability** means!! "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "\n",
    "## Exercise 2: Get a feel for the data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Look at 2 records from the DataFrame to understand the values better before filter: use take() function\n",
    "\n",
    "```python\n",
    "df.take(insert an integer here)\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Check out the schema structure of the DataFrame.\n",
    "\n",
    "What are the types of columns?\n",
    "Use:\n",
    "\n",
    "```python\n",
    "df.printSchema()\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df.printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Run the next function:\n",
    "\n",
    "```python\n",
    "df.limit(5) .toPandas ()\n",
    "```\n",
    "\n",
    "What happened here? `toPandas` function took the Spark DataFrame and translated it into Pandas DataFrame.\n",
    "\n",
    "#### Run this function only on small data sets and when exploring the data. \n",
    "#### Otherwise, you might run out of memory! \n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "How many lines have missing values? run the next command to figure it out! \n",
    "\n",
    "```python\n",
    "import pyspark.sql.functions as f\n",
    "from functools import reduce\n",
    "df.where(reduce(lambda x, y: x | y, (f.col(x).isNull() for x in df.columns))).count()\n",
    "```\n",
    "\n",
    "> [functools](https://docs.python.org/3/library/functools.html) is a python 3 library.\n",
    "> \n",
    "> [reduce](https://docs.python.org/3/library/functools.html?highlight=reduce#functools.reduce) is part of functools, it takes two arguments: x and y, and produce cumulative items of iterable - in our case: `x | y`\n",
    "> `|` is python OR operator, we concat x and y functionality with OR operator\n",
    "\n",
    "> For example, reduce(lambda x, y: x+y, [1, 2, 3, 4, 5]) calculates ((((1+2)+3)+4)+5)\n",
    "\n",
    "\n",
    "Run only reduce function and check the output:\n",
    "\n",
    "`reduce(lambda x, y: x | y, (f.col(x).isNull() for x in df.columns))`\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "reduce(lambda x, y: x | y, (f.col(x).isNull() for x in df.columns))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You created a concatenation of `OR` operators with `IS NULL` functionality for all the columns!\n",
    "\n",
    "Now, put it together:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pyspark.sql.functions as f\n",
    "from functools import reduce\n",
    "\n",
    "reducePhrase = reduce(lambda x, y: x | y, (f.col(x).isNull() for x in df.columns))\n",
    "\n",
    "df.where(reducePhrase).count()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Distinct Value\n",
    "\n",
    "Get the sum  of `id` distinct values, it should be equal to the size of the data \n",
    "\n",
    "Try both `id` and `id_str` fields.\n",
    "\n",
    "Use the next code and adjust it according to the field:\n",
    "\n",
    "```pythob\n",
    "df.select(\"field_name\").distinct().count()\n",
    "```\n",
    "\n",
    "\n",
    "What happened here? Is it in the same size of the data set?\n",
    "Don't worry; We fix that soon!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Is Null\n",
    "\n",
    "How many rows have null on the `screen_name` column?\n",
    "\n",
    "Use the `where` with col `.isNull` function to get the DataFrame with null value for `column_name`.\n",
    "\n",
    "Count it! Use the count method for that.\n",
    "\n",
    "Code sample:\n",
    "```python\n",
    "df.where(f.col('column_name').isNull()).count()\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<details><summary>Answer</summary>\n",
    "<p>\n",
    "\n",
    "#### 5\n",
    "\n",
    "</p>\n",
    "</details>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Standard Deviation\n",
    "\n",
    "As part of exploring the data phase, the standard deviation(stddev) is a must!\n",
    "\n",
    "Calculate **stddev** for `followers_count`.\n",
    "\n",
    "### Notice! \n",
    "Some rows have None/Null for `followers_count`, we can:\n",
    "\n",
    "1. Ignore and not calculate the stddev for them\n",
    "\n",
    "**OR** \n",
    "\n",
    "2. Give them a default value\n",
    "\n",
    "**OR** \n",
    "\n",
    "3. Filter them entirely out of our training data."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Start with counting how many rows has null for followers_count:\n",
    "\n",
    "Run this:\n",
    "```python\n",
    "df.where(f.col('followers_count').isNull()).count()\n",
    "```\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We go with:  `2. Give them a default value`"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Give deafult values with - Fill null values - fillna()\n",
    "\n",
    "Give the null cells a default value:\n",
    "Using [fillna](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.fillna)\n",
    "\n",
    "Notice the matching type request.\n",
    "Meaning, if a column is of type string, we will need a default value of type string.\n",
    "At the moment, all are fields are of type string.\n",
    "\n",
    "Code sample:\n",
    "```python\n",
    "df_defaultvalue = df.fillna({'column_name':'0'})\n",
    "```\n",
    "\n",
    "<details><summary>Answer</summary>\n",
    "<p>\n",
    "\n",
    "df_defaultvalue = df.fillna({'followers_count':'0'})\n",
    "\n",
    "</p>\n",
    "</details>\n",
    "\n",
    "Remember to valide yourself with count:\n",
    "\n",
    "```python\n",
    "df_defaultvalue.where(f.col('followers_count').isNull()).count()\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "2nd phase of **standard deviation** calculation is:\n",
    "\n",
    "Casting data to numbers!\n",
    "\n",
    "Cast it to integer:\n",
    "\n",
    "In the code sample, replace the `column_name` with `followers_count`:\n",
    "```python\n",
    "from pyspark.sql.types import IntegerType\n",
    "\n",
    "data_df = df_defaultvalue.withColumn(\"column_name\", df_defaultvalue[\"column_name\"].cast(IntegerType()))\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Calculate Standard Deviation! \n",
    "\n",
    "Use `pyspark.sql.function` methods, [here are the docs](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions)\n",
    "\n",
    "Check out **describe** functionality. it provides us `count`, `mean`, `stddev`, `min` and `max` calculations in one function!\n",
    "\n",
    "**Remember** - Use the last DataFrame that you created, with the casting and default values."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "\n",
    "`describe` can take any field, or calculate statistics for all fields.\n",
    "\n",
    "Code Example:\n",
    "```python\n",
    "df.describe(['age']).show()\n",
    "df.describe().toPandas().transpose()\n",
    "\n",
    "```\n",
    "\n",
    "In the code example, Change `age` to `followers_count` and run it!\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This data is dirty! "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Have you noticed a weird behavior with `id` and `id_str`?\n",
    "\n",
    "Run `.distinct().count()` on each, and count how many blank values there are there.\n",
    "\n",
    "Who has the most distinct values? Is it the same as the DataFrame?\n",
    "\n",
    "\n",
    "Use the code sample and remember to replace column name accordinly\n",
    "```python\n",
    "df.select(\"id_str\").distinct().count()\n",
    "df.select(\"id\").distinct().count()\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You probably discovered that we couldn't trust `id` nor `id_str` !\n",
    "\n",
    "Oops! What should we do? Do we need them at all?\n",
    "\n",
    "Continue to Excercise 3! "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "---"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Exercise 3: Filter the DataFrame "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You have reached the last section of cleaning and preparing the data 🎊\n",
    "\n",
    "\n",
    "In this exercise - you filter, cast, and add a default value to necessary fields using the Spark functionality.\n",
    "\n",
    "You are going to use the DataFrame that you created in chapters (2,3, and 4!)📙\n",
    "\n",
    "Follow the instructions. For any questions, please use 👉 the Q&A chat.  "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "---\n",
    "\n",
    "Start with casting:\n",
    "Run the next commands:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.types import IntegerType, BooleanType\n",
    "\n",
    "casted_df = data_df.withColumn(\"friends_count\", data_df[\"friends_count\"].cast(IntegerType()))\n",
    "casted_df = casted_df.withColumn(\"listed_count\", casted_df[\"listed_count\"].cast(IntegerType()))\n",
    "casted_df = casted_df.withColumn(\"favourites_count\", casted_df[\"favourites_count\"].cast(IntegerType()))\n",
    "casted_df = casted_df.withColumn(\"statuses_count\", casted_df[\"statuses_count\"].cast(IntegerType()))\n",
    "casted_df = casted_df.withColumn(\"verified\", casted_df[\"verified\"].cast(BooleanType()))\n",
    "casted_df = casted_df.withColumn(\"default_profile\", casted_df[\"default_profile\"].cast(BooleanType()))\n",
    "casted_df = casted_df.withColumn(\"has_extended_profile\", casted_df[\"has_extended_profile\"].cast(BooleanType()))\n",
    "casted_df = casted_df.withColumn(\"default_profile_image\", casted_df[\"default_profile_image\"].cast(BooleanType()))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "What happened here? check it out:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "casted_df.limit(5) .toPandas()\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Happy with the results? Run this:\n",
    "df = casted_df "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "✅ **Task :** \n",
    "\n",
    "#### When and withColumn functionality\n",
    "\n",
    "Let's fix the weird behavior of `id_str` and `id` fields.\n",
    "\n",
    "\n",
    "Now that we know that there are some blanks values for `id_str`, let's try to fill them out with `id` values.\n",
    "\n",
    "For achiving that, we will use the `when` functions:\n",
    "\n",
    "```python\n",
    "from pyspark.sql.functions import when\n",
    "new_df = df.select(when(df['age'].isNull(), 3).otherwise(df['age']))\n",
    "\n",
    "```\n",
    "\n",
    "Use `when` with the `withColumn` functionality:\n",
    "\n",
    "```python\n",
    "from pyspark.sql.functions import withColumn\n",
    "new_df = df.withColumn('age2', df.age + 2)\n",
    "\n",
    "```\n",
    "\n",
    "\n",
    "Put `where` and `withColumn` together:\n",
    "\n",
    "```python\n",
    " new_df = df.withColumn('new_column_name',when(df['column_that_we_check'].isNotNull(),df['colum_if_true']).\n",
    "                        otherwise(df['column_if_false']))\n",
    "```\n",
    "\n",
    "\n",
    "Replace **age** column from the examples with `id_str` and `id` according to the needs.\n",
    "\n",
    "\n",
    "**Remember!** DataFrame is an immutable object. Each function on DataFrame that transform it creates another DataFrame and returns a pointer to the new one. Remember to work with your latest DataFrame and validate yourself! \n",
    "\n",
    "\n",
    "\n",
    "[Docs for when](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.when), \n",
    "[Docs for withColumn](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.withColumn)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.functions import when\n",
    "# your code goes here\n",
    "# test = .."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<details><summary>Hint</summary>\n",
    "<p>\n",
    "\n",
    "Use the isNull function with when, like this:\n",
    "    \n",
    "```python\n",
    "when(df['id_str'].isNull(),df['id']).otherwise(df['id_str'])\n",
    "\n",
    "```  \n",
    "\n",
    "</p>\n",
    "</details>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<details><summary>Answer</summary>\n",
    "<p>\n",
    "    \n",
    "```python\n",
    "from pyspark.sql.functions import when\n",
    "test = df.withColumn('id_str',when(df['id'].isNotNull(),df['id']).otherwise(df['id_str']))\n",
    "\n",
    "```  \n",
    "</p>\n",
    "</details>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Happy with the results? save erase your old DataFrame with the next command:\n",
    "\n",
    "df = test"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Validate your output! Now the distinct count for field `id_str` should be higher"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# validate yourself\n",
    "df.select(\"id_str\").distinct().count()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "---"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "✅ **Task :** \n",
    "\n",
    "#### drop functionality\n",
    "\n",
    "Drop column `id` with drop function:\n",
    "    \n",
    "```python\n",
    "   new_df = df.drop('column_name')\n",
    "```\n",
    "\n",
    "And validate the schema for the new DataFrame\n",
    "\n",
    "```python\n",
    "  new_df.schema\n",
    "```\n",
    "\n",
    "After validating the new DataFrame, overwrite the reference to clear memory:\n",
    "\n",
    "```python\n",
    "   df = new_df\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "new_df = df.drop('id')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = new_df"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "---"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "✅ **Task :**  \n",
    "\n",
    "Drop the next fields:\n",
    "    * `default_profile_image`\n",
    "    * `has_extended_profile`\n",
    "    * `url`\n",
    "    * `created_at`\n",
    "    \n",
    "    You can drop field by field, or provide all the fields to drop in one function call.\n",
    "    \n",
    "```python\n",
    "    updated_df = df.drop('age','history','another_column_to_drop')\n",
    "```\n",
    "\n",
    "\n",
    "\n",
    "\n",
    "**Remember** to validate yourself with the schema and overwrite the DataFrame reference"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# run this:\n",
    "\n",
    "new_df = df.drop('default_profile_image','has_extended_profile','url','created_at','lang')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# What did you get? Happy with the results?\n",
    "\n",
    "new_df.limit(15) .toPandas()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = new_df"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "---"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "✅ **Task :**  \n",
    "\n",
    "#### Drop duplicates and describe functionality\n",
    "\n",
    "Sometimes, we get duplicated data accidentally, drop all duplicated by using the \n",
    "`dropDuplicates` function:\n",
    "    \n",
    "```python\n",
    "    new_df = df.dropDuplicates()\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# new_df = ... your code goes here\n",
    "new_df.count()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Use `describe` to understand how the data looks like now.\n",
    "Remember that describe works only on numerical values.\n",
    "\n",
    "\n",
    "Use the next code sample and adjust it to your needs:\n",
    "```python\n",
    "new_df.describe('column_name').show()\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "new_df.describe('favourites_count').show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Happy with the results?\n",
    "\n",
    "df = new_df"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The Machine Learning algorithm you are going to use doesn't take text/string as input. \n",
    "\n",
    "Hence, we transfer String columns to boolean or numerical.\n",
    "\n",
    "\n",
    "Turn all String columns to boolean or numerical, if not possible, drop them."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "---"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "✅ **Task :** \n",
    "\n",
    "Most of our data can be translated to _Integer_ , 1 for exist, 0 for non-exist.\n",
    "\n",
    "Implement that logic for the next columns:\n",
    "    * location\n",
    "    * status\n",
    "    * screen_name\n",
    "    * name\n",
    "    \n",
    "    \n",
    "    \n",
    "Code sample:\n",
    "```python\n",
    "df = df.withColumn('column_name',when(df['column_name'].isNull(),0).otherwise(1))\n",
    "```\n",
    "\n",
    "Run the next command to make it happen! \n",
    "    "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = df.withColumn('location',when(df['location'].isNull(),0).otherwise(1))\n",
    "df = df.withColumn('status',when(df['status'].isNull(),0).otherwise(1))\n",
    "df = df.withColumn('screen_name',when(df['screen_name'].isNull(),0).otherwise(1))\n",
    "df = df.withColumn('name',when(df['name'].isNull(),0).otherwise(1))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "---"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "✅ Task :\n",
    "\n",
    "Adapt `bot` column. \n",
    "\n",
    "`bot` is the data classification column, which indicated if the row represents a bot or not. \n",
    "\n",
    "1. Cast it into Integer.\n",
    "2. Set 1 or 0: 1 for bot and 0 for none bot.\n",
    "\n",
    "If we don't know what it is, use 0.\n",
    "\n",
    "Run the next commands, and remember to validate yourself!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# 1. cast\n",
    "df_bot = df.withColumn('bot',df['bot'].cast(IntegerType()))\n",
    "df_bot.limit(5) .toPandas()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# 2. set 1 or 0\n",
    "df_bot = df_bot.withColumn('bot',when(df_bot['bot'].isNull(),0).otherwise(df_bot['bot']))\n",
    "df_bot.limit(5) .toPandas()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Do the same with the other booelan fields:\n",
    "    Run next commends:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_bot = df_bot.withColumn('verified',df['verified'].cast(IntegerType()))\n",
    "df_bot = df_bot.withColumn('default_profile',df_bot['default_profile'].cast(IntegerType()))\n",
    "\n",
    "df_bot = df_bot.withColumn('verified',when(df_bot['verified'].isNull(),0).otherwise(df_bot['verified']))\n",
    "df_bot = df_bot.withColumn('default_profile',when(df_bot['default_profile'].isNull(),0).otherwise(df_bot['default_profile']))\n",
    "\n",
    "df_bot.limit(5) .toPandas()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "How many bots and none bots we have in the data?\n",
    "\n",
    "Run the next command to check out! "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_bot.where(df['bot']==0).count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_bot.where(df['bot']==1).count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#Happy with the results? \n",
    "\n",
    "df = df_bot"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "---"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "✅ **Task :** \n",
    "\n",
    "#### drop N/A functionality - dropna()\n",
    "\n",
    "\n",
    "`dropna` functionality is dropping rows where the column given value is null.\n",
    "\n",
    "\n",
    "1. Drop `id_str` column completly\n",
    "2. Drop rows with N/A for `description`:\n",
    "\n",
    "Code example:\n",
    "```python\n",
    "df_new = df.drop('id_str')\n",
    "\n",
    "# in order to avoid errors, drop rows with null/None or N/A for description\n",
    "df_new = df_new.dropna(subset=['description'])\n",
    "# validate yourself\n",
    "df_new.count()\n",
    "\n",
    "```\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# df_new = your code goes here"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Happy with the results?\n",
    "\n",
    "df = df_new"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Run the next commend, we will need it for chapter number 4\n",
    "sub = df.selectExpr('description','bot as label')\n",
    "sub.write.parquet(\"train_data_only_description\")\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Save updated DataFrame to file**\n",
    "To optimize, speed up queries, and maintain schema information, save the DataFrame as a parquet file. \n",
    "\n",
    ">Parquet is a columnar file format that provides optimizations to speed up queries and is a far more efficient file format than CSV or JSON, supported by many data processing systems\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "---"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "✅ **Task :** \n",
    "\n",
    "#### User Define Function - udf functionality\n",
    "\n",
    "`description` is the only string column left.\n",
    "Spark pattern mining algorithm takes an Array of unique Strings as in input.\n",
    "\n",
    "Hence, for executing pattern mining on description, you implement a function that takes description column string\n",
    "and turns it into an Array of unique Strings.\n",
    "\n",
    "For doing it, you will create a UDF - user define function.\n",
    "\n",
    "\n",
    "\n",
    "Code example for guidence:\n",
    "\n",
    "```python\n",
    "from pyspark.sql.types import ArrayType, StringType\n",
    "from pyspark.sql.functions import udf\n",
    "\n",
    "def split_and_set(some_str):\n",
    "    return {your python code goes here}\n",
    "\n",
    "# connect everything together: \n",
    "# set the udf\n",
    "list_udf = udf(lambda y: split_and_set(y), ArrayType(StringType()))\n",
    "\n",
    "# call udf from withColumn:\n",
    "new_df = df.withColumn('description', list_udf(df['description']))\n",
    "\n",
    "#validate yourself!\n",
    "new_df.take(2)\n",
    "\n",
    "#all good?\n",
    "df = new_df\n",
    "```\n",
    "\n",
    "\n",
    "You might get errors, in the exception log stack,\n",
    "search for `AnalysisException` and try to understand the problem.\n",
    "\n",
    "Try to think about what will happen if you run the code twice?\n",
    "\n",
    "Do it with a pointer to new DataFrame so you won't lose your results.\n",
    "\n",
    "\n",
    "<details><summary>Answer</summary>\n",
    "<p>\n",
    "\n",
    "```python\n",
    "from pyspark.sql.types import ArrayType, StringType\n",
    "from pyspark.sql.functions import udf\n",
    "\n",
    "def split_and_set(some_str):\n",
    "    if isinstance(some_str, str):\n",
    "        some_str = ''.join(c for c in some_str if c not in \"[](){}<>,'/.\")\n",
    "        return list(set(some_str.split(' ')))\n",
    "    return some_str\n",
    "\n",
    "\n",
    "list_udf = udf(lambda y: split_and_set(y), ArrayType(StringType()))\n",
    "\n",
    "new_df = df.withColumn('description', list_udf(df['description']))\n",
    "df = new_df\n",
    "```\n",
    "    \n",
    "</p>\n",
    "</details>\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# check out this python code. Run it. What did you get?\n",
    "# will this work for the task?\n",
    "# how do you combine it with UDF?\n",
    "\n",
    "def split_and_set(some_str):\n",
    "    if isinstance(some_str, str):\n",
    "        some_str = ''.join(c for c in some_str if c not in \"[](){}<>,'/.\")\n",
    "        return list(set(some_str.split(' ')))\n",
    "    return some_str\n",
    "\n",
    "some_str = '[csds b lol,a]'\n",
    "\n",
    "split_and_set(split_and_set(some_str))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# your code goes here"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "scrolled": true
   },
   "source": [
    "\n",
    "\n",
    "\n",
    "# Validate yourself and save to Parquet!\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Before saving the DataFrame to Parquet, look at a sample of the data to validate it."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df.limit(5) .toPandas()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Happy with the results? Save updated DataFrame to file"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# happy with the results? write to file!\n",
    "# run this command\n",
    "df.write.mode('overwrite').parquet(\"final_train_data\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Checkout `final_train_data` libaray on Jupyter Files\n",
    "\n",
    "What file format did you get? **write** it in our chat! "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Well Done! 👏👏👏\n",
    "\n",
    "\n",
    "## You just finished:  Intro to Data Cleaning and Preparation \n",
    "\n",
    "\n",
    "## Next Chapter:  Apache Spark ML - Create train and test set "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
